package com.tian.mqtt;

import com.tian.factory.ApplicationContextFactory;
import com.tian.mqtt.enums.MqttTopicEnum;
import com.tian.mqtt.process.BaseProcess;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


/**
 * @author tianwc  公众号：java后端技术全栈、面试专栏
 * @version 1.0.0
 * 2023年11月07日 10:02
 * 在线刷题 1200+题和1000+篇干货文章：<a href="https://woaijava.cc/">博客地址</a>
 * <p>
 * 订阅回调通知分发器
 */
@Slf4j
public class ReqDispatcher {
    private static int CPU_CORE = Runtime.getRuntime().availableProcessors();
    private static final String THREAD_PREFIX = "thread-call-runner-%d";
    private static ThreadPoolTaskExecutor executor;

    static {
        executor = new ThreadPoolTaskExecutor();

        executor.setCorePoolSize(CPU_CORE);
        executor.setMaxPoolSize(CPU_CORE);
        executor.setKeepAliveSeconds(((int) TimeUnit.SECONDS.toSeconds(30)));
        executor.setQueueCapacity(1000);
        executor.setThreadNamePrefix(THREAD_PREFIX);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
    }

    public static void submit(String topic, MqttMessage mqttMessage) {
        BaseProcess baseProcess = (BaseProcess) ApplicationContextFactory.getBean(MqttTopicEnum.getBeanClassByTopicName(topic));
        if (baseProcess == null) {
            log.error("topic =【{}】 is no exist !", topic);
            return;
        }
        baseProcess.setTopic(topic);
        baseProcess.setMqttMessage(mqttMessage);
        executor.submit(baseProcess);
    }
}
